package com.example.demoserver.mqtt;

import com.example.demoserver.entity.*;
import com.example.demoserver.entity.json.DataContainer;
import com.example.demoserver.entity.json.DataPoint;
import com.example.demoserver.service.*;
import com.example.demoserver.service.impl.InteDatapointServiceImpl;
import com.example.demoserver.service.impl.InteGatewayServiceImpl;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;

import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;


@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class TestMqttHandler extends ChannelInboundHandlerAdapter {

    private final InteDatapointService inteDatapointService;

    private final InteGatewayService inteGatewayService;

    private final InteAppstateService inteAppstateService;

    private final InteAppalarmService inteAppalarmService;

    private final InteGatewaylogService inteGatewaylogService;
    private static final Collection<Channel> clientList = new HashSet();
    private static final Map<String, Object> msgMap = new HashMap<>();
    public static List<List<String>> conList = new ArrayList<>();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof MqttMessage) {
            Channel channel = ctx.channel();
            MqttMessage message = (MqttMessage) msg;

            MqttMessageType messageType = message.fixedHeader().messageType();

            log.info("MQTT接收到的发送类型===》{}", messageType);
            switch (messageType) {
                // 建立连接
                case CONNECT:
                    try {
                        if (isOpen()) {
                            MqttConnectMessage m = (MqttConnectMessage) message;
                            String username = m.payload().userName();
                            String password = m.payload().passwordInBytes() != null
                                    ? new String(m.payload().passwordInBytes())
                                    : null;
                            if (isValid(username, password)) {
                                this.connect(channel, (MqttConnectMessage) message);
                                InteGateway inteGateway = inteGatewayService.selectByGIdAndPas(username, password);
                                if (inteGateway.getGatewaystate() == 1) {
                                    inteGateway.setGatewaystate(0);
                                    inteGatewayService.updateById(inteGateway);
                                    saveLog("上线", "无", "设备" + inteGateway.getGatewayname() + "上线");
                                }
                                List<String> l = new ArrayList<>();
                                l.add(channel.id().toString());
                                l.add(username);
                                l.add(password);
                                l.add(channel.remoteAddress().toString());
                                conList.add(l);
                            } else {
                                // Close the connection as the client is not authorized...
                                ctx.close();
                            }
                        }

                    } catch (Exception e) {
                        //如果用户密码，客户端ID校验不成功，会二次建立CONNECT类型连接
                        //但是没有实际意义
                        ctx.close();
                    }
                    break;
                // 发布消息
                case PUBLISH:
                    this.publish(channel, (MqttPublishMessage) message);
                    break;
                // 订阅主题
                case SUBSCRIBE:
                    this.subscribe(channel, (MqttSubscribeMessage) message);
                    break;
                // 退订主题
                case UNSUBSCRIBE:
                    this.unSubscribe(channel, (MqttUnsubscribeMessage) message);
                    break;
                // 心跳包
                case PINGREQ:
                    this.pingReq(channel, message);
                    break;
                // 断开连接
                case DISCONNECT:
                    if (isOpen()) {
                        for (int i = 0; i < conList.size(); i++) {
                            if (conList.get(i).get(0).equals(channel.id().toString())) {
                                InteGateway inteGateway = inteGatewayService.selectByGIdAndPas(conList.get(i).get(1), conList.get(i).get(2));
                                inteGateway.setGatewaystate(1);
                                inteGatewayService.updateById(inteGateway);
                                saveLog("离线", "无", "设备" + inteGateway.getGatewayname() + "离线");
                                conList.remove(i);
                            }
                        }
                        this.disConnect(channel, message);
                    }
                    break;
                // 确认收到响应报文,用于服务器向客户端推送qos1/qos2后，客户端返回服务器的响应
                case PUBACK:
                    this.puback(channel, message);
                    break;
                // qos2类型，发布收到
                case PUBREC:
                    this.pubrec(channel, message);
                    break;
                // qos2类型，发布释放响应
                case PUBREL:
                    this.pubrel(channel, message);
                    break;
                // qos2类型，发布完成
                case PUBCOMP:
                    this.pubcomp(channel, message);
                    break;
                default:
                    if (log.isDebugEnabled()) {
                        log.debug("Nonsupport server message  type of '{}'.", messageType);
                    }
                    break;
            }
        }
    }

    private boolean isValid(String username, String password) {
        // Validate the username and password...
        // This is a dummy check, replace it with your actual validation logic.
        InteGateway inteGateway = inteGatewayService.selectByGIdAndPas(username, password);
        if (inteGateway != null) {
            return true;
        }
        return false;
    }

    public boolean isOpen() {
        List<InteAppstate> list = inteAppstateService.selectAll();
        for (int i = 0; i < list.size(); i++) {
            if (list.get(i).getAppname().equals("设备管理")) {
                if (list.get(i).getAppstate() == 0) {
                    return true;
                }

            }
        }
        return false;
    }

    public void saveLog(String op, String type, String value) {
        InteGatewaylog inteGatewaylog = new InteGatewaylog();
        inteGatewaylog.setHtype(type);
        inteGatewaylog.setLogoperate(op);
        inteGatewaylog.setLogvalue(value);
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        String formatted = now.format(formatter);
        inteGatewaylog.setLogtime(formatted);
        inteGatewaylogService.save(inteGatewaylog);
    }

    /**
     * 创建连接时，需要响应对应的ACK包
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/13 23:12
     */
    public void connect(Channel channel, MqttConnectMessage msg) {
        //连接需要答复
        MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(
                        MqttMessageType.CONNACK, false, AT_LEAST_ONCE, false, 0),
                new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, true), null);
        channel.writeAndFlush(okResp);
        clientList.add(channel);
    }

    /**
     * 响应ping心跳ACK包
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/13 23:13
     */
    public void pingReq(Channel channel, MqttMessage msg) {
        if (log.isDebugEnabled()) {
            log.debug("MQTT pingReq received.");
        }
        MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false,
                AT_LEAST_ONCE, false, 0));
        channel.writeAndFlush(pingResp);
    }

    /**
     * 服务端主动断开连接
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/13 23:13
     */
    public void disConnect(Channel channel, MqttMessage msg) {
        clientList.remove(channel);
        if (channel.isActive()) {
            channel.close();
            if (log.isDebugEnabled()) {
                log.debug("MQTT channel '{}' was closed.", channel.id().asShortText());
            }
        }
    }

    /**
     * qos2中使用,发布确认
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/13 23:14
     */
    public void puback(Channel channel, MqttMessage msg) {

//        MqttMessageIdVariableHeader mqttMessageIdVariableHeader = msg.variableHeader();
    }

    /**
     * qos2中发布释放ACK包
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/13 23:13
     */
    public void pubrel(Channel channel, MqttMessage msg) {
        Object mqttMessageIdVariableHeader = msg.variableHeader();
        if (mqttMessageIdVariableHeader instanceof MqttPubReplyMessageVariableHeader) {
            // qos2类型,接收发布者消息
            log.info("qos2客户端返回确认的消息包:{}", msg.payload());
            MqttPubReplyMessageVariableHeader header = (MqttPubReplyMessageVariableHeader) mqttMessageIdVariableHeader;
            MqttMessage mqttMessage = MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.EXACTLY_ONCE, false, 0),
                    MqttMessageIdVariableHeader.from(header.messageId()),
                    0);
            channel.writeAndFlush(mqttMessage);
            for (Channel channel1 : clientList) {
                try {
                    send(channel1, "aaa", MqttQoS.EXACTLY_ONCE, "我收到那");
                } catch (InterruptedException e) {
                    log.error("该通道推送消息失败,可加入容错机制,channel:{}", channel1);
                }
            }
        }
    }

    /**
     * qos2:发布收到ACK包
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/14 0:09
     */
    public void pubrec(Channel channel, MqttMessage msg) {
        Object mqttMessageIdVariableHeader = msg.variableHeader();
        if (mqttMessageIdVariableHeader instanceof MqttPubReplyMessageVariableHeader) {
            // qos2类型,接收发布者消息
            log.info("qos2客户端返回确认的消息包:{}", msg.payload());
            MqttPubReplyMessageVariableHeader header = (MqttPubReplyMessageVariableHeader) mqttMessageIdVariableHeader;
            MqttMessage mqttMessage = MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.EXACTLY_ONCE, false, 0),
                    MqttMessageIdVariableHeader.from(header.messageId()),
                    0);
            channel.writeAndFlush(mqttMessage);
        }
    }

    /**
     * qos2发布完成
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/14 0:11
     */
    public void pubcomp(Channel channel, MqttMessage msg) {
        Object mqttMessageIdVariableHeader = msg.variableHeader();
        if (mqttMessageIdVariableHeader instanceof MqttPubReplyMessageVariableHeader) {
            // qos2类型,接收发布者消息
//            log.info("qos2客户端返回确认的消息包:{}",msg.payload());
//            MqttPubReplyMessageVariableHeader header = (MqttPubReplyMessageVariableHeader) mqttMessageIdVariableHeader;
//            MqttMessage mqttMessage = MqttMessageFactory.newMessage(
//                    new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.EXACTLY_ONCE, false, 0),
//                    MqttMessageIdVariableHeader.from(header.messageId()),
//                    0);
//            channel.writeAndFlush(mqttMessage);
        }
    }

    public boolean checked(Collection info){
        if (info instanceof Collection) {
            Collection<?> collection = (Collection<?>) info;
            if (!collection.isEmpty()) {
                return false;
            } else {
                return true;
            }
        } else {
            return false;
        }
    }
    /**
     * 客户端发布消息时使用
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/13 23:14
     */
    public void publish(Channel channel, MqttPublishMessage msg) {
        log.info("qos类型是{}", msg.fixedHeader().qosLevel());
        String topic = msg.variableHeader().topicName();
        log.info("订阅主题:{}", topic);
        if (topic.equals("PushGateway")) {
            ByteBuf buf = msg.content().duplicate();
            byte[] tmp = new byte[buf.readableBytes()];
            buf.readBytes(tmp);
            String content = null;
            try {
                content = new String(tmp, "UTF-8");
                System.out.println(content);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            List<String> nowCon=new ArrayList<>();
            for(int i=0;i<conList.size();i++){
                if(conList.get(i).get(0).equals(channel.id().toString())){
                    nowCon=conList.get(i);
                }
            }
            LocalDateTime now = LocalDateTime.now();
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            String formatted = now.format(formatter);

            Gson gson = new Gson();
            Type type = new TypeToken<DataContainer>() {
            }.getType();
            DataContainer data = null;

            try {
                data = gson.fromJson(content, type);
            } catch (JsonSyntaxException e) {
                InteAppalarm inteAppalarm = new InteAppalarm();
                inteAppalarm.setAlarminfo("MQTT数据格式错误");
                inteAppalarm.setAlarmtime(formatted);
                inteAppalarm.setAlarmtype(2);
                inteAppalarmService.save(inteAppalarm);
                return;
            }
            if (StringUtils.isBlank(content)||checked(data.getDataPoints())) {
                log.error("MQTT接收到的数据包为空===》{}", content);
                InteAppalarm inteAppalarm = new InteAppalarm();
                inteAppalarm.setAlarminfo("设备"+nowCon.get(1)+"接收到的数据为空");
                inteAppalarm.setAlarmtime(formatted);
                inteAppalarm.setAlarmtype(2);
                inteAppalarmService.save(inteAppalarm);
                puback(channel, msg, "MQTT接收到的数据包为空");
                return;
            }
            InteGateway inteGateway = inteGatewayService.selectByGatewayId(data.getGatewayId());
            if(inteGateway!=null){
                if (nowCon.get(1).equals(data.getGatewayId())) {
                    for (DataPoint dp : data.getDataPoints()) {
                        InteDatapoint inteDatapoint = new InteDatapoint();
                        inteDatapoint.setGatewayid(inteGateway.getId());
                        inteDatapoint.setIsbool(dp.getIsBool() ? 1 : 0);
                        inteDatapoint.setPointtag(dp.getTag());
                        inteDatapoint.setPointunit(dp.getUnit());
                        inteDatapoint.setPointvalue(dp.getValue());
                        inteDatapoint.setUpdatetime(data.getUploadTime());
                        inteDatapointService.save(inteDatapoint);
                    }
                    log.info("MQTT读取到的客户端发送信息===>{}", content);
                    // 如果是qos1或者qos2类型都需要响应
                    puback(channel, msg, content);
                    // 推送主题消息
                    log.info("推送客户端客户端消息：{}", content);
                    if (AT_LEAST_ONCE == msg.fixedHeader().qosLevel() || AT_MOST_ONCE == msg.fixedHeader().qosLevel()) {
                        for (Channel channel1 : clientList) {
                            try {
                                send(channel1, topic, msg.fixedHeader().qosLevel(), content);
                            } catch (InterruptedException e) {
                                log.error("该通道推送消息失败,可加入容错机制,channel:{}", channel1);
                            }
                        }
                    }
                } else {
                    InteAppalarm inteAppalarm = new InteAppalarm();
                    inteAppalarm.setAlarminfo(nowCon.get(1) + "网关推送数据中的网关id为" + inteGateway.getGatewayid());
                    inteAppalarm.setAlarmtime(formatted);
                    inteAppalarm.setAlarmtype(2);
                    inteAppalarmService.save(inteAppalarm);
                }
            }

//            for (int i = 0; i < conList.size(); i++) {
//                if (conList.get(i).get(0).equals(channel.id().toString())) {
//                    if (conList.get(i).get(1).equals(inteGateway.getGatewayid()) && conList.get(i).get(2).equals(inteGateway.getMqttpassword())) {
//                        for (DataPoint dp : data.getDataPoints()) {
//                            InteDatapoint inteDatapoint = new InteDatapoint();
//                            inteDatapoint.setGatewayid(inteGateway.getId());
//                            inteDatapoint.setIsbool(dp.getIsBool() ? 1 : 0);
//                            inteDatapoint.setPointtag(dp.getTag());
//                            inteDatapoint.setPointunit(dp.getUnit());
//                            inteDatapoint.setPointvalue(dp.getValue());
//                            inteDatapoint.setUpdatetime(data.getUploadTime());
//                            inteDatapointService.save(inteDatapoint);
//                        }
//                        log.info("MQTT读取到的客户端发送信息===>{}", content);
//                        // 如果是qos1或者qos2类型都需要响应
//                        puback(channel, msg, content);
//
//                        // 推送主题消息
//                        log.info("推送客户端客户端消息：{}", content);
//                        if (AT_LEAST_ONCE == msg.fixedHeader().qosLevel() || AT_MOST_ONCE == msg.fixedHeader().qosLevel()) {
//                            for (Channel channel1 : clientList) {
//                                try {
//                                    send(channel1, topic, msg.fixedHeader().qosLevel(), content);
//                                } catch (InterruptedException e) {
//                                    log.error("该通道推送消息失败,可加入容错机制,channel:{}", channel1);
//                                }
//                            }
//                        }
//                    } else {
//                        InteAppalarm inteAppalarm = new InteAppalarm();
//                        inteAppalarm.setAlarminfo(conList.get(i).get(1) + "网关推送数据网关id为" + inteGateway.getGatewayid());
//                        inteAppalarm.setAlarmtime(formatted);
//                        inteAppalarm.setAlarmdevice(gatewayId);
//                        inteAppalarmService.save(inteAppalarm);
//                    }
//                }
//            }
        }

    }

    /**
     * 客户端订阅消息ACK包
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/13 23:14
     */
    public void subscribe(Channel channel, MqttSubscribeMessage msg) {
        MqttQoS mqttQoS = msg.fixedHeader().qosLevel();
//        mqttQoS = MqttQoS.EXACTLY_ONCE;
        MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.SUBACK, false, mqttQoS, false, 0),
                MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
                new MqttSubAckPayload(0));
        channel.writeAndFlush(subAckMessage);
    }

    /**
     * 客户端取消订阅ACK包
     *
     * @param channel:
     * @param msg:
     * @author liyang
     * @since 2022/3/13 23:15
     */
    public void unSubscribe(Channel channel, MqttUnsubscribeMessage msg) {

        MqttUnsubAckMessage unSubAckMessage = (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0),
                MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
        channel.writeAndFlush(unSubAckMessage);
        disConnect(channel, msg);
    }

    /**
     * 捕获异常状态，客户端断开钩子函数
     *
     * @param ctx:
     * @param cause:
     * @author liyang
     * @since 2022/3/13 23:15
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("MQTT客户端被强制关闭:{}:{}", ctx.channel().id().asShortText(), cause);
        if (ctx.channel().isActive()) {
            ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
            ctx.channel().close();
        }
    }

    /**
     * qos1中响应客户端ACK包
     *
     * @param channel:
     * @param msg:
     * @param payLoad:
     * @author liyang
     * @since 2022/3/13 23:16
     */
    // 客户端QOS1消息类型( MqttQoS.AT_LEAST_ONCE = qos1)，需要服务器响应包
    private void puback(Channel channel, MqttPublishMessage msg, String payLoad) {

        if (MqttQoS.AT_MOST_ONCE == msg.fixedHeader().qosLevel()) {
            // qos0消息类型,不需要ACK客户端
            return;
        }
        if (MqttQoS.AT_LEAST_ONCE == msg.fixedHeader().qosLevel()) {
            // qos1消息类型,需要向客户端返回MqttMessageType.PUBACK 类型ACK应答
            MqttPubAckMessage sendMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0),
                    MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()),
                    payLoad);
            channel.writeAndFlush(sendMessage);
            return;
        }
        if (MqttQoS.EXACTLY_ONCE == msg.fixedHeader().qosLevel()) {
            // qos2消息类型
            MqttMessage mqttMessage = MqttMessageFactory.newMessage(
                    new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.EXACTLY_ONCE, false, 0),
                    MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()),
                    payLoad);

            channel.writeAndFlush(mqttMessage);
        }
    }

    /**
     * 向客户端发布订阅主题消息
     *
     * @param channel:
     * @param topic:
     * @param qos:
     * @param sendMessage:
     * @return
     * @author liyang
     * @since 2022/3/13 23:16
     */
    public ChannelFuture send(Channel channel, String topic, MqttQoS qos, String sendMessage) throws InterruptedException {
        MqttRequest request = new MqttRequest((sendMessage.getBytes()));
        MqttPublishMessage pubMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.PUBLISH,
                        request.isDup(),
                        qos,
                        request.isRetained(),
                        0),
                new MqttPublishVariableHeader(topic, 0),
                Unpooled.buffer().writeBytes(request.getPayload()));
        msgMap.put(pubMessage.variableHeader().messageId() + "", pubMessage.variableHeader().messageId() + "");
        // 超过高水位，则采取同步模式
        if (channel.isWritable()) {
            return channel.writeAndFlush(pubMessage);
        }
        return channel.writeAndFlush(pubMessage).sync();
    }
}